#include <algorithm>
#include <sstream>
#include <string>

#include "common/config.h"
#include "common/exception.h"
#include "common/logger.h"
#include "common/rid.h"
#include "storage/index/b_plus_tree.h"
#include "storage/index/index_iterator.h"
#include "storage/index/stl_comparator_wrapper.h"

namespace bustub {

INDEX_TEMPLATE_ARGUMENTS
BPLUSTREE_TYPE::BPlusTree(std::string name, page_id_t header_page_id, BufferPoolManager *buffer_pool_manager,
                          const KeyComparator &comparator, int leaf_max_size, int internal_max_size)
    : index_name_(std::move(name)),
      bpm_(buffer_pool_manager),
      comparator_(std::move(comparator)),
      leaf_max_size_(leaf_max_size),
      internal_max_size_(internal_max_size),
      header_page_id_(header_page_id) {
  WritePageGuard guard = bpm_->FetchPageWrite(header_page_id_);
  auto root_page = guard.AsMut<BPlusTreeHeaderPage>();
  root_page->root_page_id_ = INVALID_PAGE_ID;
  LOG_DEBUG("index_name_=%s, leaf_max_size_=%d, internal_max_size_=%d", index_name_.c_str(), leaf_max_size_,
            internal_max_size_);
}

/*
 * Helper function to decide whether current b+tree is empty
 */
INDEX_TEMPLATE_ARGUMENTS
auto BPLUSTREE_TYPE::IsEmpty() const -> bool {
  auto guard = bpm_->FetchPageBasic(header_page_id_);
  auto root_page = guard.As<BPlusTreeHeaderPage>();
  return root_page->root_page_id_ == INVALID_PAGE_ID;
}
/*****************************************************************************
 * SEARCH
 *****************************************************************************/
/*
 * Return the only value that associated with input key
 * This method is used for point query
 * @return : true means key exists
 */
INDEX_TEMPLATE_ARGUMENTS
auto BPLUSTREE_TYPE::GetValue(const KeyType &key, std::vector<ValueType> *result, Transaction *txn) -> bool {
  std::shared_lock<std::shared_mutex> lock(mutex_);
  // LOG_DEBUG("key=%ld", key.ToString());
  if (IsEmpty()) {
    return false;
  }
  // Declaration of context instance.
  Context ctx;
  (void)ctx;
  auto page_id = GetRootPageId();
  auto leaf_page_id = FindLeaf(page_id, key, ctx);
  int ignore = 0;
  bool succ = bpm_->FetchPageBasic(leaf_page_id).template As<LeafPage>()->GetValue(key, result, comparator_, ignore);
  if (!succ) {
    // LOG_DEBUG("key=%ld", key.ToString());
  }
  return succ;
}

INDEX_TEMPLATE_ARGUMENTS
auto BPLUSTREE_TYPE::Insert(const KeyType &key, const ValueType &value, Transaction *txn) -> bool {
  std::unique_lock<std::shared_mutex> lock(mutex_);
  // LOG_DEBUG("key=%ld, value=%s", key.ToString(), value.ToString().c_str());
  bool succ = InsertDetail(key, value, txn);
  if (succ) {
    // Print(bpm_);
  }
  return succ;
}

/*****************************************************************************
 * INSERTION
 *****************************************************************************/
/*
 * Insert constant key & value pair into b+ tree
 * if current tree is empty, start new tree, update root page id and insert
 * entry, otherwise insert into leaf page.
 * @return: since we only support unique key, if user try to insert duplicate
 * keys return false, otherwise return true.
 */
INDEX_TEMPLATE_ARGUMENTS
auto BPLUSTREE_TYPE::InsertDetail(const KeyType &key, const ValueType &value, Transaction *txn) -> bool {
  // Declaration of context instance.
  // 教材Figure14.16,14.17
  Context ctx;
  if (IsEmpty()) {
    page_id_t page_id;
    // 根节点赋值
    auto header_guard = bpm_->FetchPageBasic(header_page_id_);
    auto guard = bpm_->NewPageGuarded(&page_id);
    header_guard.AsMut<BPlusTreeHeaderPage>()->root_page_id_ = page_id;
    header_guard.Drop();
    auto *leaf_page = guard.AsMut<LeafPage>();
    leaf_page->Init(leaf_max_size_);
    leaf_page->Insert(key, value, comparator_);
    return true;
  }

  // 寻找叶子页
  auto root_page_id = GetRootPageId();
  auto leaf_page_id = FindLeafForWrite(root_page_id, key, ctx);
  auto leaf_page_guard = bpm_->FetchPageBasic(leaf_page_id);
  auto leaf_page = leaf_page_guard.template AsMut<LeafPage>();

  int ignore = 0;
  std::vector<ValueType> result;
  bool has_value = leaf_page->GetValue(key, &result, comparator_, ignore);
  if (has_value) {
    return false;
  }

  if (leaf_page->GetSize() + 1 < leaf_page->GetMaxSize()) {
    leaf_page->Insert(key, value, comparator_);
    return true;
  }

  // split leaf_page,
  page_id_t new_page_id;
  auto new_guard = bpm_->NewPageGuarded(&new_page_id);
  auto new_page = new_guard.AsMut<LeafPage>();
  new_page->Init(leaf_max_size_);
  char *tmp = new char[sizeof(MappingType) + static_cast<size_t>(BUSTUB_PAGE_SIZE)];
  auto t = reinterpret_cast<LeafPage *>(tmp);
  t->CopyArray(leaf_page->GetArray(), 0, leaf_page->GetSize());
  t->Insert(key, value, comparator_);
  new_page->SetNextPageId(leaf_page->GetNextPageId());
  leaf_page->SetNextPageId(new_page_id);
  int tmp_size = t->GetSize();
  int tmp_size1 = ceil(static_cast<double>(tmp_size - 1) / 2);
  leaf_page->CopyArray(t->GetArray(), 0, tmp_size1);
  new_page->CopyArray(t->GetArray(), tmp_size1, tmp_size - tmp_size1);
  delete[] tmp;
  // 递归之前Drop，防止占用太多page
  auto kk = new_page->KeyAt(0);
  new_guard.Drop();
  leaf_page_guard.Drop();
  return InsertInParent(leaf_page_id, kk, new_page_id, ctx);
}

INDEX_TEMPLATE_ARGUMENTS
auto BPLUSTREE_TYPE::InsertInParent(page_id_t N, KeyType KK, page_id_t NN, Context &ctx) -> bool {
  auto root_page_id = GetRootPageId();

  // 递归到根
  if (root_page_id == N) {
    page_id_t page_id;
    auto guard = bpm_->NewPageGuarded(&page_id);
    bpm_->FetchPageBasic(header_page_id_).AsMut<BPlusTreeHeaderPage>()->root_page_id_ = page_id;
    auto root_page = guard.AsMut<InternalPage>();
    root_page->Init(internal_max_size_);
    root_page->Insert(KeyType(), N, comparator_);  // key无意义, value, comparator_
    root_page->Insert(KK, NN, comparator_);        // key, value, comparator_
    return true;
  }

  // 寻找父亲
  auto parent_page_id = ctx.parent_page_id_.back();
  ctx.parent_page_id_.pop_back();
  auto parent_guard = bpm_->FetchPageBasic(parent_page_id);
  auto parent_page = parent_guard.AsMut<InternalPage>();
  if (parent_page->GetSize() + 1 <= parent_page->GetMaxSize()) {
    parent_page->Insert(KK, NN, comparator_);
    // parent以上都解锁
    return true;
  }

  // 分裂父亲
  page_id_t new_page_id;
  auto new_guard = bpm_->NewPageGuarded(&new_page_id);
  auto new_page = new_guard.AsMut<InternalPage>();
  new_page->Init(internal_max_size_);
  char *tmp = new char[sizeof(MappingType) + static_cast<size_t>(BUSTUB_PAGE_SIZE)];
  auto t = reinterpret_cast<InternalPage *>(tmp);
  t->CopyArray(parent_page->GetArray(), 0, parent_page->GetSize());
  t->Insert(KK, NN, comparator_);
  int tmp_size = t->GetSize();
  int tmp_size1 = ceil(static_cast<double>(tmp_size) / 2);
  parent_page->CopyArray(t->GetArray(), 0, tmp_size1);
  new_page->CopyArray(t->GetArray(), tmp_size1, tmp_size - tmp_size1);
  delete[] tmp;
  parent_guard.Drop();
  auto kk = new_page->KeyAt(0);
  // 内部节点key 0重置
  new_page->SetKeyAt(0, KeyType());
  new_guard.Drop();
  bool succ = InsertInParent(parent_page_id, kk, new_page_id, ctx);
  return succ;
}

/*****************************************************************************
 * REMOVE
 *****************************************************************************/
/*
 * Delete key & value pair associated with input key
 * If current tree is empty, return immediately.
 * If not, User needs to first find the right leaf page as deletion target, then
 * delete entry from leaf page. Remember to deal with redistribute or merge if
 * necessary.
 */
INDEX_TEMPLATE_ARGUMENTS
void BPLUSTREE_TYPE::Remove(const KeyType &key, Transaction *txn) {
  std::unique_lock<std::shared_mutex> lock(mutex_);
  // LOG_DEBUG("key=%ld", key.ToString());
  // Declaration of context instance.
  Context ctx;
  (void)ctx;
  if (IsEmpty()) {
    return;
  }

  auto root_page_id = GetRootPageId();
  auto leaf_page_id = FindLeafForWrite(root_page_id, key, ctx);
  // low key,if high key index is size-1
  // int index = 0;
  std::vector<ValueType> ignore;
  auto leaf_page_guard = bpm_->FetchPageBasic(leaf_page_id);
  auto leaf_page = leaf_page_guard.template AsMut<LeafPage>();
  // remove时判断是否存在
  bool succ = leaf_page->Remove(key, comparator_);
  if (!succ) {
    return;
  }

  int size = leaf_page->GetSize();
  int min_size = leaf_page->GetMinSize();
  leaf_page_guard.Drop();

  if (size < min_size) {
    CoalesceOrRedistribute(leaf_page_id, txn, ctx);
  }
}

INDEX_TEMPLATE_ARGUMENTS
void BPLUSTREE_TYPE::AdjustRoot() {
  auto root_page_id = GetRootPageId();
  auto root_page = bpm_->FetchPageBasic(root_page_id).template AsMut<BPlusTreePage>();

  if (!root_page->IsLeafPage() && root_page->GetSize() == 1) {
    auto new_root_page_id = reinterpret_cast<InternalPage *>(root_page)->ValueAt(0);
    bpm_->FetchPageBasic(header_page_id_).AsMut<BPlusTreeHeaderPage>()->root_page_id_ = new_root_page_id;
  } else if (root_page->GetSize() == 0) {
    bpm_->FetchPageBasic(header_page_id_).AsMut<BPlusTreeHeaderPage>()->root_page_id_ = INVALID_PAGE_ID;
  }
}

INDEX_TEMPLATE_ARGUMENTS
void BPLUSTREE_TYPE::CoalesceOrRedistribute(page_id_t page_id, Transaction *txn, Context &ctx) {
  auto root_page_id = GetRootPageId();
  if (root_page_id == page_id) {
    AdjustRoot();
    return;
  }
  auto page_guard = bpm_->FetchPageBasic(page_id);
  auto page = page_guard.template As<BPlusTreePage>();

  auto parent_page_id = ctx.parent_page_id_.back();
  ctx.parent_page_id_.pop_back();
  auto parent_guard = bpm_->FetchPageBasic(parent_page_id);
  auto parent_page = parent_guard.AsMut<InternalPage>();

  int index_of_page_in_parent = parent_page->ValueIndex(page_id);
  BUSTUB_ASSERT(index_of_page_in_parent >= 0, "value index invalid");
  int sibling_index = index_of_page_in_parent == 0 ? 1 : index_of_page_in_parent - 1;
  auto sibling_page_id = parent_page->ValueAt(sibling_index);
  auto sibling_page_guard = bpm_->FetchPageBasic(sibling_page_id);
  auto sibling_page = sibling_page_guard.template As<BPlusTreePage>();
  // FIXME:为啥是小于，不是小于等于?
  int size = page->GetSize();
  int sibling_size = sibling_page->GetSize();
  int max_size = page->GetMaxSize();
  page_guard.Drop();
  sibling_page_guard.Drop();
  parent_guard.Drop();
  if (!page->IsLeafPage()) {
    max_size++;
  }
  if (size + sibling_size < max_size) {
    Coalesce(sibling_page_id, page_id, parent_page_id, index_of_page_in_parent, txn, ctx);
  } else {
    Redistribute(sibling_page_id, page_id, parent_page_id, index_of_page_in_parent, txn, ctx);
  }
}

INDEX_TEMPLATE_ARGUMENTS
void BPLUSTREE_TYPE::Coalesce(page_id_t sibling_page_id, page_id_t page_id, page_id_t parent_page_id, int index,
                              Transaction *txn, Context &ctx) {
  if (index == 0) {
    std::swap(page_id, sibling_page_id);
    index = 1;
  }
  auto page_guard = bpm_->FetchPageBasic(page_id);
  auto page = page_guard.template AsMut<BPlusTreePage>();
  auto sibling_page_guard = bpm_->FetchPageBasic(sibling_page_id);
  auto sibling_page = sibling_page_guard.template AsMut<BPlusTreePage>();
  auto parent_page_guard = bpm_->FetchPageBasic(parent_page_id);
  auto *parent_page = parent_page_guard.template AsMut<InternalPage>();

  if (page->IsLeafPage()) {
    reinterpret_cast<LeafPage *>(sibling_page)
        ->CopyArray(reinterpret_cast<const LeafPage *>(page)->GetArray(), 0, page->GetSize(),
                    reinterpret_cast<LeafPage *>(sibling_page)->GetSize());
    reinterpret_cast<LeafPage *>(sibling_page)
        ->SetNextPageId(reinterpret_cast<const LeafPage *>(page)->GetNextPageId());
  } else {
    reinterpret_cast<InternalPage *>(page)->SetKeyAt(0, parent_page->KeyAt(index));

    // FIXME:分左右,合并到左
    reinterpret_cast<InternalPage *>(sibling_page)
        ->CopyArray(reinterpret_cast<InternalPage *>(page)->GetArray(), 0, page->GetSize(),
                    reinterpret_cast<InternalPage *>(sibling_page)->GetSize());
  }
  page->SetSize(0);
  parent_page->Remove(index);
  int parent_size = parent_page->GetSize();
  int parent_min_size = parent_page->GetMinSize();
  page_guard.Drop();
  sibling_page_guard.Drop();
  parent_page_guard.Drop();
  if (parent_size < parent_min_size) {
    CoalesceOrRedistribute(parent_page_id, txn, ctx);
  }
}

INDEX_TEMPLATE_ARGUMENTS
void BPLUSTREE_TYPE::Redistribute(page_id_t sibling_page_id, page_id_t page_id, page_id_t parent_page_id, int index,
                                  Transaction *txn, Context &ctx) {
  auto page_guard = bpm_->FetchPageBasic(page_id);
  auto page = page_guard.template AsMut<BPlusTreePage>();
  auto sibling_page_guard = bpm_->FetchPageBasic(sibling_page_id);
  auto sibling_page = sibling_page_guard.template AsMut<BPlusTreePage>();
  auto parent_page_guard = bpm_->FetchPageBasic(parent_page_id);
  auto parent_page = parent_page_guard.template AsMut<InternalPage>();

  if (index == 0) {
    // sibling_page->moveFirstToEndOf(page);
    // 从右兄弟借
    if (page->IsLeafPage()) {
      reinterpret_cast<LeafPage *>(page)->Insert(reinterpret_cast<LeafPage *>(page)->GetSize(),
                                                 reinterpret_cast<LeafPage *>(sibling_page)->Remove(0));
      parent_page->SetKeyAt(1, reinterpret_cast<LeafPage *>(sibling_page)->GetArray()[0].first);
    } else {
      reinterpret_cast<InternalPage *>(page)->Insert(reinterpret_cast<InternalPage *>(page)->GetSize(),
                                                     reinterpret_cast<InternalPage *>(sibling_page)->Remove(0));
      reinterpret_cast<InternalPage *>(page)->SetKeyAt(reinterpret_cast<InternalPage *>(page)->GetSize() - 1,
                                                       reinterpret_cast<InternalPage *>(parent_page)->KeyAt(1));
      parent_page->SetKeyAt(1, reinterpret_cast<InternalPage *>(sibling_page)->GetArray()[0].first);
      reinterpret_cast<InternalPage *>(sibling_page)->SetKeyAt(0, KeyType());
    }

  } else {
    // sibling_page->moveLastToFrontOf(page, index);
    // 从左兄弟借
    if (page->IsLeafPage()) {
      reinterpret_cast<LeafPage *>(page)->Insert(
          0, reinterpret_cast<LeafPage *>(sibling_page)
                 ->Remove(reinterpret_cast<LeafPage *>(sibling_page)->GetSize() - 1));
      parent_page->SetKeyAt(index, reinterpret_cast<LeafPage *>(page)->GetArray()[0].first);
    } else {
      reinterpret_cast<InternalPage *>(page)->Insert(
          0, reinterpret_cast<InternalPage *>(sibling_page)
                 ->Remove(reinterpret_cast<LeafPage *>(sibling_page)->GetSize() - 1));
      reinterpret_cast<InternalPage *>(page)->SetKeyAt(1, reinterpret_cast<InternalPage *>(parent_page)->KeyAt(index));
      parent_page->SetKeyAt(index, reinterpret_cast<InternalPage *>(page)->GetArray()[0].first);
      reinterpret_cast<InternalPage *>(page)->SetKeyAt(0, KeyType());
    }
  }
}

// FIXME:优化，插入删除key时更新叶子迭代器
/*****************************************************************************
 * INDEX ITERATOR
 *****************************************************************************/
/*
 * Input parameter is void, find the leftmost leaf page first, then construct
 * index iterator
 * @return : index iterator
 */
INDEX_TEMPLATE_ARGUMENTS
auto BPLUSTREE_TYPE::Begin() -> INDEXITERATOR_TYPE {
  std::shared_lock<std::shared_mutex> lock(mutex_);
  auto page_id = GetRootPageId();
  auto guard = bpm_->FetchPageBasic(page_id);
  if (page_id == -1) {
    return End();
  }
  auto page = guard.template As<BPlusTreePage>();

  while (!page->IsLeafPage()) {
    page_id = guard.template As<InternalPage>()->ValueAt(0);
    guard = bpm_->FetchPageBasic(page_id);
    page = guard.template As<BPlusTreePage>();
  }
  return INDEXITERATOR_TYPE(page_id, 0, bpm_);
}

/*
 * Input parameter is low key, find the leaf page that contains the input key
 * first, then construct index iterator
 * @return : index iterator
 */
INDEX_TEMPLATE_ARGUMENTS
auto BPLUSTREE_TYPE::Begin(const KeyType &key) -> INDEXITERATOR_TYPE {
  std::shared_lock<std::shared_mutex> lock(mutex_);
  auto root_page_id = GetRootPageId();
  if (root_page_id == -1) {
    return End();
  }
  Context ctx;
  auto leaf_page_id = FindLeaf(root_page_id, key, ctx);
  // low key,if high key index is size-1
  int index = 0;
  std::vector<ValueType> ignore;
  auto leaf_page_guard = bpm_->FetchPageBasic(leaf_page_id);
  auto leaf_page = leaf_page_guard.template As<LeafPage>();
  leaf_page->GetValue(key, &ignore, comparator_, index);
  return INDEXITERATOR_TYPE(leaf_page_id, index, bpm_);
}

/*
 * Input parameter is void, construct an index iterator representing the end
 * of the key/value pair in the leaf node
 * @return : index iterator
 */
INDEX_TEMPLATE_ARGUMENTS
auto BPLUSTREE_TYPE::End() -> INDEXITERATOR_TYPE { return INDEXITERATOR_TYPE(-1, 0, bpm_); }

/**
 * @return Page id of the root of this tree
 */
INDEX_TEMPLATE_ARGUMENTS
auto BPLUSTREE_TYPE::GetRootPageId() const -> page_id_t {
  return bpm_->FetchPageBasic(header_page_id_).As<BPlusTreeHeaderPage>()->root_page_id_;
}

/*****************************************************************************
 * UTILITIES AND DEBUG
 *****************************************************************************/

/*
 * This method is used for test only
 * Read data from file and insert one by one
 */
INDEX_TEMPLATE_ARGUMENTS
void BPLUSTREE_TYPE::InsertFromFile(const std::string &file_name, Transaction *txn) {
  int64_t key;
  std::ifstream input(file_name);
  while (input >> key) {
    KeyType index_key;
    index_key.SetFromInteger(key);
    RID rid(key);
    Insert(index_key, rid, txn);
  }
}
/*
 * This method is used for test only
 * Read data from file and remove one by one
 */
INDEX_TEMPLATE_ARGUMENTS
void BPLUSTREE_TYPE::RemoveFromFile(const std::string &file_name, Transaction *txn) {
  int64_t key;
  std::ifstream input(file_name);
  while (input >> key) {
    KeyType index_key;
    index_key.SetFromInteger(key);
    Remove(index_key, txn);
  }
}

/*
 * This method is used for test only
 * Read data from file and insert/remove one by one
 */
INDEX_TEMPLATE_ARGUMENTS
void BPLUSTREE_TYPE::BatchOpsFromFile(const std::string &file_name, Transaction *txn) {
  int64_t key;
  char instruction;
  std::ifstream input(file_name);
  while (input) {
    input >> instruction >> key;
    RID rid(key);
    KeyType index_key;
    index_key.SetFromInteger(key);
    switch (instruction) {
      case 'i':
        Insert(index_key, rid, txn);
        break;
      case 'd':
        Remove(index_key, txn);
        break;
      default:
        break;
    }
  }
}

INDEX_TEMPLATE_ARGUMENTS
void BPLUSTREE_TYPE::Print(BufferPoolManager *bpm) {
  auto root_page_id = GetRootPageId();
  auto guard = bpm->FetchPageBasic(root_page_id);
  PrintTree(guard.PageId(), guard.template As<BPlusTreePage>());
}

INDEX_TEMPLATE_ARGUMENTS
void BPLUSTREE_TYPE::PrintTree(page_id_t page_id, const BPlusTreePage *page) {
  if (page->IsLeafPage()) {
    auto *leaf = reinterpret_cast<const LeafPage *>(page);
    std::cout << "Leaf Page: " << page_id << "\tNext: " << leaf->GetNextPageId() << std::endl;

    // Print the contents of the leaf page.
    std::cout << "Contents: ";
    for (int i = 0; i < leaf->GetSize(); i++) {
      std::cout << leaf->KeyAt(i);
      if ((i + 1) < leaf->GetSize()) {
        std::cout << ", ";
      }
    }
    std::cout << std::endl;
    std::cout << std::endl;

  } else {
    auto *internal = reinterpret_cast<const InternalPage *>(page);
    std::cout << "Internal Page: " << page_id << std::endl;

    // Print the contents of the internal page.
    std::cout << "Contents: ";
    for (int i = 0; i < internal->GetSize(); i++) {
      std::cout << internal->KeyAt(i) << ": " << internal->ValueAt(i);
      if ((i + 1) < internal->GetSize()) {
        std::cout << ", ";
      }
    }
    std::cout << std::endl;
    std::cout << std::endl;
    for (int i = 0; i < internal->GetSize(); i++) {
      auto guard = bpm_->FetchPageBasic(internal->ValueAt(i));
      PrintTree(guard.PageId(), guard.template As<BPlusTreePage>());
    }
  }
}

/**
 * This method is used for debug only, You don't need to modify
 */
INDEX_TEMPLATE_ARGUMENTS
void BPLUSTREE_TYPE::Draw(BufferPoolManager *bpm, const std::string &outf) {
  if (IsEmpty()) {
    LOG_WARN("Drawing an empty tree");
    return;
  }

  std::ofstream out(outf);
  out << "digraph G {" << std::endl;
  auto root_page_id = GetRootPageId();
  auto guard = bpm->FetchPageBasic(root_page_id);
  ToGraph(guard.PageId(), guard.template As<BPlusTreePage>(), out);
  out << "}" << std::endl;
  out.close();
}

/**
 * This method is used for debug only, You don't need to modify
 */
INDEX_TEMPLATE_ARGUMENTS
void BPLUSTREE_TYPE::ToGraph(page_id_t page_id, const BPlusTreePage *page, std::ofstream &out) {
  std::string leaf_prefix("LEAF_");
  std::string internal_prefix("INT_");
  if (page->IsLeafPage()) {
    auto *leaf = reinterpret_cast<const LeafPage *>(page);
    // Print node name
    out << leaf_prefix << page_id;
    // Print node properties
    out << "[shape=plain color=green ";
    // Print data of the node
    out << "label=<<TABLE BORDER=\"0\" CELLBORDER=\"1\" CELLSPACING=\"0\" CELLPADDING=\"4\">\n";
    // Print data
    out << "<TR><TD COLSPAN=\"" << leaf->GetSize() << "\">P=" << page_id << "</TD></TR>\n";
    out << "<TR><TD COLSPAN=\"" << leaf->GetSize() << "\">"
        << "max_size=" << leaf->GetMaxSize() << ",min_size=" << leaf->GetMinSize() << ",size=" << leaf->GetSize()
        << "</TD></TR>\n";
    out << "<TR>";
    for (int i = 0; i < leaf->GetSize(); i++) {
      out << "<TD>" << leaf->KeyAt(i) << "</TD>\n";
    }
    out << "</TR>";
    // Print table end
    out << "</TABLE>>];\n";
    // Print Leaf node link if there is a next page
    if (leaf->GetNextPageId() != INVALID_PAGE_ID) {
      out << leaf_prefix << page_id << " -> " << leaf_prefix << leaf->GetNextPageId() << ";\n";
      out << "{rank=same " << leaf_prefix << page_id << " " << leaf_prefix << leaf->GetNextPageId() << "};\n";
    }
  } else {
    auto *inner = reinterpret_cast<const InternalPage *>(page);
    // Print node name
    out << internal_prefix << page_id;
    // Print node properties
    out << "[shape=plain color=pink ";  // why not?
    // Print data of the node
    out << "label=<<TABLE BORDER=\"0\" CELLBORDER=\"1\" CELLSPACING=\"0\" CELLPADDING=\"4\">\n";
    // Print data
    out << "<TR><TD COLSPAN=\"" << inner->GetSize() << "\">P=" << page_id << "</TD></TR>\n";
    out << "<TR><TD COLSPAN=\"" << inner->GetSize() << "\">"
        << "max_size=" << inner->GetMaxSize() << ",min_size=" << inner->GetMinSize() << ",size=" << inner->GetSize()
        << "</TD></TR>\n";
    out << "<TR>";
    for (int i = 0; i < inner->GetSize(); i++) {
      out << "<TD PORT=\"p" << inner->ValueAt(i) << "\">";
      if (i > 0) {
        out << inner->KeyAt(i);
      } else {
        out << " ";
      }
      out << "</TD>\n";
    }
    out << "</TR>";
    // Print table end
    out << "</TABLE>>];\n";
    // Print leaves
    for (int i = 0; i < inner->GetSize(); i++) {
      auto child_guard = bpm_->FetchPageBasic(inner->ValueAt(i));
      auto child_page = child_guard.template As<BPlusTreePage>();
      ToGraph(child_guard.PageId(), child_page, out);
      if (i > 0) {
        auto sibling_guard = bpm_->FetchPageBasic(inner->ValueAt(i - 1));
        auto sibling_page = sibling_guard.template As<BPlusTreePage>();
        if (!sibling_page->IsLeafPage() && !child_page->IsLeafPage()) {
          out << "{rank=same " << internal_prefix << sibling_guard.PageId() << " " << internal_prefix
              << child_guard.PageId() << "};\n";
        }
      }
      out << internal_prefix << page_id << ":p" << child_guard.PageId() << " -> ";
      if (child_page->IsLeafPage()) {
        out << leaf_prefix << child_guard.PageId() << ";\n";
      } else {
        out << internal_prefix << child_guard.PageId() << ";\n";
      }
    }
  }
}

INDEX_TEMPLATE_ARGUMENTS
auto BPLUSTREE_TYPE::DrawBPlusTree() -> std::string {
  if (IsEmpty()) {
    return "()";
  }

  PrintableBPlusTree p_root = ToPrintableBPlusTree(GetRootPageId());
  std::ostringstream out_buf;
  p_root.Print(out_buf);

  return out_buf.str();
}

INDEX_TEMPLATE_ARGUMENTS
auto BPLUSTREE_TYPE::ToPrintableBPlusTree(page_id_t root_id) -> PrintableBPlusTree {
  auto root_page_guard = bpm_->FetchPageBasic(root_id);
  auto root_page = root_page_guard.template As<BPlusTreePage>();
  PrintableBPlusTree proot;

  if (root_page->IsLeafPage()) {
    auto leaf_page = root_page_guard.template As<LeafPage>();
    proot.keys_ = leaf_page->ToString();
    proot.size_ = proot.keys_.size() + 4;  // 4 more spaces for indent

    return proot;
  }

  // draw internal page
  auto internal_page = root_page_guard.template As<InternalPage>();
  proot.keys_ = internal_page->ToString();
  proot.size_ = 0;
  for (int i = 0; i < internal_page->GetSize(); i++) {
    page_id_t child_id = internal_page->ValueAt(i);
    PrintableBPlusTree child_node = ToPrintableBPlusTree(child_id);
    proot.size_ += child_node.size_;
    proot.children_.push_back(child_node);
  }

  return proot;
}

// 查找给定key的叶子结点，若key不存在，返回需要插入的叶子结点id
INDEX_TEMPLATE_ARGUMENTS
auto BPLUSTREE_TYPE::FindLeaf(page_id_t page_id, const KeyType &key, Context &ctx) const -> page_id_t {
  auto page_guard = bpm_->FetchPageBasic(page_id);
  auto page = page_guard.template AsMut<BPlusTreePage>();

  if (page->IsLeafPage()) {
    return page_id;
  }
  auto *internal_page = reinterpret_cast<InternalPage *>(page);
  // 二分右边界
  int idx = internal_page->UpperBound(key, comparator_);
  auto son_page_id = internal_page->ValueAt(idx);
  page_guard.Drop();
  ctx.parent_page_id_.emplace_back(page_id);
  return FindLeaf(son_page_id, key, ctx);
}

INDEX_TEMPLATE_ARGUMENTS
auto BPLUSTREE_TYPE::FindLeafForWrite(page_id_t page_id, const KeyType &key, Context &ctx) const -> page_id_t {
  auto page_guard = bpm_->FetchPageBasic(page_id);
  auto page = page_guard.template AsMut<BPlusTreePage>();

  if (page->IsLeafPage()) {
    return page_id;
  }
  auto *internal_page = reinterpret_cast<InternalPage *>(page);
  // 二分右边界
  int idx = internal_page->UpperBound(key, comparator_);
  auto son_page_id = internal_page->ValueAt(idx);
  page_guard.Drop();
  ctx.parent_page_id_.emplace_back(page_id);
  return FindLeafForWrite(son_page_id, key, ctx);
}

template class BPlusTree<GenericKey<4>, RID, GenericComparator<4>>;

template class BPlusTree<GenericKey<8>, RID, GenericComparator<8>>;

template class BPlusTree<GenericKey<16>, RID, GenericComparator<16>>;

template class BPlusTree<GenericKey<32>, RID, GenericComparator<32>>;

template class BPlusTree<GenericKey<64>, RID, GenericComparator<64>>;

}  // namespace bustub
